/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <time.h>

#include "../HttpHelper.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()

using namespace pulsar;

// Before https://github.com/apache/pulsar/pull/20948, when message deduplication is enabled, sending chunks
// to the broker will receive send error response.
TEST(ChunkDedupTest, testSendChunks) {
    Client client{"pulsar://localhost:6650"};
    ProducerConfiguration conf;
    conf.setBatchingEnabled(false);
    conf.setChunkingEnabled(true);
    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer("test-send-chunks", conf, producer));

    Latch latch{1};
    std::string value(1024000 /* max message size */ * 100, 'a');
    producer.sendAsync(MessageBuilder().setContent(value).build(),
                       [&latch](Result result, const MessageId& msgId) {
                           LOG_INFO("Send to " << msgId << ": " << result);
                           latch.countdown();
                       });
    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
    client.close();
}

TEST(ChunkDedupTest, testLazyPartitionedProducer) {
    std::string topic = "test-lazy-partitioned-producer-" + std::to_string(time(nullptr));
    Client client{"pulsar://localhost:6650"};
    ProducerConfiguration conf;
    conf.setLazyStartPartitionedProducers(true);
    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));

    constexpr int numPartitions = 3;
    int res =
        makePutRequest("http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions",
                       std::to_string(numPartitions));
    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

    for (int i = 0; i < 10; i++) {
        const auto key = std::to_string(i % numPartitions);
        MessageId msgId;
        producer.send(MessageBuilder().setPartitionKey(key).setContent("msg-" + std::to_string(i)).build(),
                      msgId);
        ASSERT_TRUE(msgId.ledgerId() >= 0 && msgId.entryId() >= 0) << "i: " << i << ", msgId: " << msgId;
    }
    client.close();
}

int main(int argc, char* argv[]) {
    ::testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}
